diff --git a/Dockerfile b/Dockerfile index 35e64db..b51ef49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/airflow:2.8.2-python3.8 +FROM apache/airflow:2.8.2-python3.10 ENV PYTHONBUFFERED=0 ENV AIRFLOW__LOGGING__LOGGING_LEVEL=INFO diff --git a/README.md b/README.md index c5a5bf3..c8e2782 100644 --- a/README.md +++ b/README.md @@ -1 +1,88 @@ -# bi-dags +# BI-DAGs Setup Guide + +This README provides a step-by-step guide on setting up your environment for running BI-DAGs with Airflow. + +## Prerequisites + +Before you begin, ensure you have `pyenv` installed on your system. If you don't have `pyenv` installed, please follow the instructions [here](https://github.com/pyenv/pyenv#installation). + +## Installation Steps + +### 1. Set Up Python Environment + +First, we'll set up a Python environment using `pyenv`. + +```sh +# Define the desired Python version +export PYTHON_VERSION=3.10.11 + +# Install the specified Python version using pyenv +pyenv install $PYTHON_VERSION + +# Set the global Python version to the installed one +pyenv global $PYTHON_VERSION + +# Create a virtual environment named 'bi-dags' +pyenv virtualenv $PYTHON_VERSION bi-dags + +# Activate the virtual environment +pyenv activate bi-dags +``` + +### 2. Navigate to Project Directory + +Change your current working directory to `bi-dags`. + +```sh +cd bi-dags +``` + +### 3. Install Dependencies + +With your virtual environment activated, install the necessary dependencies. + +```sh +pip install -r requirements.txt +``` + +### 4. Set Airflow Home + +Configure the Airflow home environment variable. + +```sh +export AIRFLOW_HOME=$PWD +``` + +### 5. Start Airflow + +Initialize and start Airflow using the standalone command. + +```sh +airflow standalone +``` + +### 6. Start Postgres with Docker Compose + +If you're using Docker to manage your Postgres database, start the service. + +```sh +docker-compose start +``` + +### 7. Add Airflow Connections via UI + +Lastly, add the necessary Airflow connections through the UI. + +- Navigate to Admin -> Connections in the Airflow UI. +- Click on "Add" and fill in the details: + - Connection Id: `superset_qa` + - Login: `airflow` + - Database: `airflow` + - Password: `airflow` + - Host: `localhost` + - Port: `5432` + - Connection Type: `postgres` + +More information, how to manage db connections can be found [here](https://airflow.apache.org/docs/apache-airflow/2.8.2/howto/connection.html). + +After completing these steps, your environment should be set up and ready for running BI-DAGs with Airflow. diff --git a/dags/migrations/alembic.ini b/dags/migrations/alembic.ini new file mode 100644 index 0000000..6b83140 --- /dev/null +++ b/dags/migrations/alembic.ini @@ -0,0 +1,115 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = . + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to ./versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:./versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/dags/migrations/env.py b/dags/migrations/env.py new file mode 100644 index 0000000..9dd6c6c --- /dev/null +++ b/dags/migrations/env.py @@ -0,0 +1,74 @@ +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import engine_from_config, pool + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/dags/migrations/migrations.py b/dags/migrations/migrations.py new file mode 100644 index 0000000..86f3161 --- /dev/null +++ b/dags/migrations/migrations.py @@ -0,0 +1,20 @@ +import os + +import pendulum +from airflow.models import DAG +from airflow.models.param import Param +from airflow_provider_alembic.operators.alembic import AlembicOperator + +with DAG( + "migrations", + schedule=None, + start_date=pendulum.today("UTC").add(days=-1), + params={"command": Param("upgrade"), "revision": Param("head")}, +) as dag: + AlembicOperator( + task_id="alembic_op", + conn_id="superset_qa", + command="{{ params.command }}", + revision="{{ params.revision }}", + script_location=f"{os.environ['AIRFLOW_HOME']}/dags/migrations/", + ) diff --git a/dags/migrations/script.py.mako b/dags/migrations/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/dags/migrations/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/dags/migrations/versions/64ac526a078b_my_db_revision.py b/dags/migrations/versions/64ac526a078b_my_db_revision.py new file mode 100644 index 0000000..9f7be84 --- /dev/null +++ b/dags/migrations/versions/64ac526a078b_my_db_revision.py @@ -0,0 +1,50 @@ +"""My DB Revision + +Revision ID: 64ac526a078b +Revises: 4438ef06e0c7 +Create Date: 2024-04-11 16:06:50.691527 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "64ac526a078b" +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade(): + op.execute("CREATE SCHEMA IF NOT EXISTS oa") + op.create_table( + "oa.open_access", + sa.Column("year", sa.Integer, primary_key=True), + sa.Column("closed_access", sa.Integer, nullable=False), + sa.Column("bronze_open_access", sa.Integer, nullable=False), + sa.Column("green_open_access", sa.Integer, nullable=False), + sa.Column("gold_open_access", sa.Integer, nullable=False), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), + schema="oa", + ) + + op.create_table( + "oa.golden_open_access", + sa.Column("year", sa.Integer, primary_key=True), + sa.Column("cern_read_and_publish", sa.Integer, nullable=False), + sa.Column("cern_individual_apcs", sa.Integer, nullable=False), + sa.Column("scoap3", sa.Integer, nullable=False), + sa.Column("other", sa.Integer, nullable=False), + sa.Column("other_colectives", sa.Integer, nullable=False), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), + schema="oa", + ) + + +def downgrade(): + op.drop_table("oa.golden_open_access", schema="oa") + op.drop_table("oa.open_access", schema="oa") diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..6e923ab --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,22 @@ +services: + postgres: + image: docker.io/library/postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + expose: + - 5432 + ports: + - 5432:5432 + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + +volumes: + postgres-db-volume: diff --git a/requirements-test.txt b/requirements-test.txt index a3a4d9b..bb8c5df 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ -pre-commit==3.5.0 +pre-commit==3.6.0 pytest==7.4.4 coverage==7.4.0 pytest-cov==4.1.0 diff --git a/requirements.txt b/requirements.txt index ade786d..2f0a994 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ --c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.8.txt -apache-airflow[celery, postgres]==2.8.1 +-c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt +apache-airflow[celery, postgres, redis]==2.8.1 +alembic==1.13.1 +airflow-provider-alembic==1.0.0 diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..457b46c --- /dev/null +++ b/tests/README.md @@ -0,0 +1,102 @@ +# BI-DAGs Setup Guide + +This README provides a step-by-step guide on setting up your environment for running BI-DAGs with Airflow. + +## Prerequisites + +Before you begin, ensure you have `pyenv` installed on your system. If you don't have `pyenv` installed, please follow the instructions [here](https://github.com/pyenv/pyenv#installation). + +## Installation Steps + +### 1. Set Up Python Environment + +First, we'll set up a Python environment using `pyenv`. + +\```sh + +# Define the desired Python version + +export PYTHON_VERSION=3.10.11 + +# Install the specified Python version using pyenv + +pyenv install $PYTHON_VERSION + +# Set the global Python version to the installed one + +pyenv global $PYTHON_VERSION + +# Create a virtual environment named 'bi-dags' + +pyenv virtualenv $PYTHON_VERSION bi-dags + +# Activate the virtual environment + +pyenv activate bi-dags +\``` + +### 2. Install Dependencies + +With your virtual environment activated, install the necessary dependencies. + +\```sh +pip install -r requirements.txt +\``` + +### 3. Navigate to Project Directory + +Change your current working directory to `bi-dags`. + +\```sh +cd bi-dags +\``` + +### 4. Set Airflow Home + +Configure the Airflow home environment variable. + +\```sh +export AIRFLOW_HOME=$PWD +\``` + +### 5. Configure Database Connection + +Export the database connection environment variable. + +\```sh + +# Replace 'your_db_connection_string' with your actual database connection string + +export AIRFLOW**CORE**SQL_ALCHEMY_CONN=your_db_connection_string +\``` + +### 6. Start Airflow + +Initialize and start Airflow using the standalone command. + +\```sh +airflow standalone +\``` + +### 7. Start Postgres with Docker Compose + +If you're using Docker to manage your Postgres database, start the service. + +\```sh +docker-compose start +\``` + +### 8. Add Airflow Connections via UI + +Lastly, add the necessary Airflow connections through the UI. + +- Navigate to Admin -> Connections in the Airflow UI. +- Click on "Add" and fill in the details: + - Name: `SUPERSET_QA` + - Login: `airflow` + - Database: `airflow` + - Password: `airflow` + - Host: `localhost` + - Port: `5432` + +After completing these steps, your environment should be set up and ready for running BI-DAGs with Airflow.