Skip to content

Commit

Permalink
Dags: migration to SQLAlchemy
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 31, 2024
1 parent 41f5f29 commit 305b317
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 104 deletions.
15 changes: 15 additions & 0 deletions dags/common/models/library/library_cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryCernPublicationRecords(Base):
__tablename__ = "library_cern_publication_records"

year = Column(Integer, primary_key=True)
publications_total_count = Column(Float)
conference_proceedings_count = Column(Float)
non_journal_proceedings_count = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryNewItemsInTheInstitutionalRepository(Base):
__tablename__ = "library_items_in_the_institutional_repository"

year = Column(Integer, primary_key=True)
inspire_arxiv_records = Column(Float)
inspire_curators_records = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
17 changes: 17 additions & 0 deletions dags/common/models/open_access/oa_golden_open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAGoldenOpenAccess(Base):
__tablename__ = "oa_golden_open_access"

year = Column(Integer, primary_key=True)
cern_read_and_publish = Column(Float)
cern_individual_apcs = Column(Float)
scoap3 = Column(Float)
other = Column(Float)
other_collective_models = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
16 changes: 16 additions & 0 deletions dags/common/models/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAOpenAccess(Base):
__tablename__ = "oa_open_access"

year = Column(Integer, primary_key=True)
closed_access = Column(Float)
bronze_open_access = Column(Float)
green_open_access = Column(Float)
gold_open_access = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
31 changes: 31 additions & 0 deletions dags/common/operators/sqlalchemy_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from executor_config import kubernetes_executor_config
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker


def get_session(conn_id: str):
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()


def sqlalchemy_task(conn_id: str):
def decorator(func):
@task(executor_config=kubernetes_executor_config)
def wrapper(*args, **kwargs):
session = get_session(conn_id)
try:
result = func(*args, session=session, **kwargs)
session.commit()
return result
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
session.close()

return wrapper

return decorator
59 changes: 30 additions & 29 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.library.library_cern_publication_records import (
LibraryCernPublicationRecords,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from library.utils import get_url
from sqlalchemy.sql import func


@dag(
Expand All @@ -31,7 +35,7 @@ def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
results["year"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
Expand All @@ -43,35 +47,32 @@ def join(values, **kwargs):
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
publications_total_count = EXCLUDED.publications_total_count,
conference_proceedings_count = EXCLUDED.conference_proceedings_count,
non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
@sqlalchemy_task(conn_id="superset_qa")
def populate_cern_publication_records(results, session, **kwargs):
record = (
session.query(LibraryCernPublicationRecords)
.filter_by(year=results["year"])
.first()
)
if record:
record.publications_total_count = results["publications_total_count"]
record.conference_proceedings_count = results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
]
record.non_journal_proceedings_count = results[
"non_journal_proceedings_count"
],
},
executor_config=kubernetes_executor_config,
)
]
record.updated_at = func.now()
else:
new_record = LibraryCernPublicationRecords(
year=results["year"],
publications_total_count=results["publications_total_count"],
conference_proceedings_count=results["conference_proceedings_count"],
non_journal_proceedings_count=results["non_journal_proceedings_count"],
)
session.add(new_record)

populate_cern_publication_records(unpacked_results)


library_cern_publication_records = library_cern_publication_records_dag()
43 changes: 24 additions & 19 deletions dags/library/new_items_in_the_institutional_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.library.library_new_items_in_the_institutional_repository import (
LibraryNewItemsInTheInstitutionalRepository,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -56,25 +60,26 @@ def join_and_add_year(counts, **kwargs):

results = join_and_add_year(counts)

populate_items_in_the_institutional_repository = PostgresOperator(
task_id="populate_items_in_the_institutional_repository",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO items_in_the_institutional_repository (year,
inspire_arxiv_records, inspire_curators_records, created_at, updated_at)
VALUES (%(year)s, %(inspire_arxiv_records)s, %(inspire_curators_records)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
inspire_arxiv_records = EXCLUDED.inspire_arxiv_records,
inspire_curators_records = EXCLUDED.inspire_curators_records,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_new_items_in_the_institutional_repository(results, session, **kwargs):
record = (
session.query(LibraryNewItemsInTheInstitutionalRepository)
.filter_by(year=results["year"])
.first()
)
if record:
record.inspire_arxiv_records = results["inspire_arxiv_records"]
record.inspire_curators_records = results["inspire_curators_records"]
record.updated_at = func.now()
else:
new_record = LibraryNewItemsInTheInstitutionalRepository(
year=results["year"],
inspire_arxiv_records=results["inspire_arxiv_records"],
inspire_curators_records=results["inspire_curators_records"],
)
session.add(new_record)

counts >> results >> populate_items_in_the_institutional_repository
populate_new_items_in_the_institutional_repository(results)


Library_new_items_in_the_institutional_repository = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

def upgrade():
op.create_table(
"items_in_the_institutional_repository",
"library_items_in_the_institutional_repository",
sa.Column("year", sa.Integer, primary_key=True),
sa.Column("inspire_arxiv_records", sa.Integer, nullable=False),
sa.Column("inspire_curators_records", sa.Integer, nullable=False),
Expand All @@ -29,4 +29,4 @@ def upgrade():


def downgrade():
op.drop_table("items_in_the_institutional_repository")
op.drop_table("library_items_in_the_institutional_repository")
49 changes: 26 additions & 23 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -68,29 +70,30 @@ def join_and_add_year(counts, **kwargs):

results = join_and_add_year(counts)

populate_golden_open_access = PostgresOperator(
task_id="populate_golden_open_access",
postgres_conn_id="superset",
sql="""
INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs,
scoap3, other, other_collective_models, created_at, updated_at)
VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s,
%(scoap3)s, %(other)s, %(other_collective_models)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
cern_read_and_publish = EXCLUDED.cern_read_and_publish,
cern_individual_apcs = EXCLUDED.cern_individual_apcs,
scoap3 = EXCLUDED.scoap3,
other = EXCLUDED.other,
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_golden_open_access(results, session, **kwargs):
record = (
session.query(OAGoldenOpenAccess).filter_by(year=results["year"]).first()
)
if record:
record.cern_read_and_publish = results["cern_read_and_publish"]
record.cern_individual_apcs = results["cern_individual_apcs"]
record.scoap3 = results["scoap3"]
record.other = results["other"]
record.other_collective_models = results["other_collective_models"]
record.updated_at = func.now()
else:
new_record = OAGoldenOpenAccess(
year=results["year"],
cern_read_and_publish=results["cern_read_and_publish"],
cern_individual_apcs=results["cern_individual_apcs"],
scoap3=results["scoap3"],
other=results["other"],
other_collective_models=results["other_collective_models"],
)
session.add(new_record)

counts >> results >> populate_golden_open_access
populate_golden_open_access(results)


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
Loading

0 comments on commit 305b317

Please sign in to comment.