Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Library KPIs: items in the institutional repo #31

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dags/common/models/library/library_cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryCernPublicationRecords(Base):
__tablename__ = "library_cern_publication_records"

year = Column(Integer, primary_key=True)
publications_total_count = Column(Float)
conference_proceedings_count = Column(Float)
non_journal_proceedings_count = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryNewItemsInTheInstitutionalRepository(Base):
__tablename__ = "library_items_in_the_institutional_repository"

year = Column(Integer, primary_key=True)
inspire_arxiv_records = Column(Float)
inspire_curators_records = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
17 changes: 17 additions & 0 deletions dags/common/models/open_access/oa_golden_open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAGoldenOpenAccess(Base):
__tablename__ = "oa_golden_open_access"

year = Column(Integer, primary_key=True)
cern_read_and_publish = Column(Float)
cern_individual_apcs = Column(Float)
scoap3 = Column(Float)
other = Column(Float)
other_collective_models = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
16 changes: 16 additions & 0 deletions dags/common/models/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAOpenAccess(Base):
__tablename__ = "oa_open_access"

year = Column(Integer, primary_key=True)
closed_access = Column(Float)
bronze_open_access = Column(Float)
green_open_access = Column(Float)
gold_open_access = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
31 changes: 31 additions & 0 deletions dags/common/operators/sqlalchemy_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from executor_config import kubernetes_executor_config
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker


def get_session(conn_id: str):
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()


def sqlalchemy_task(conn_id: str):
def decorator(func):
@task(executor_config=kubernetes_executor_config)
def wrapper(*args, **kwargs):
session = get_session(conn_id)
try:
result = func(*args, session=session, **kwargs)
session.commit()
return result
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
session.close()

return wrapper

return decorator
59 changes: 30 additions & 29 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.library.library_cern_publication_records import (
LibraryCernPublicationRecords,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from library.utils import get_url
from sqlalchemy.sql import func


@dag(
Expand All @@ -31,7 +35,7 @@ def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
results["year"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
Expand All @@ -43,35 +47,32 @@ def join(values, **kwargs):
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
publications_total_count = EXCLUDED.publications_total_count,
conference_proceedings_count = EXCLUDED.conference_proceedings_count,
non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
@sqlalchemy_task(conn_id="superset_qa")
def populate_cern_publication_records(results, session, **kwargs):
record = (
session.query(LibraryCernPublicationRecords)
.filter_by(year=results["year"])
.first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

)
if record:
record.publications_total_count = results["publications_total_count"]
record.conference_proceedings_count = results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
]
record.non_journal_proceedings_count = results[
"non_journal_proceedings_count"
],
},
executor_config=kubernetes_executor_config,
)
]
record.updated_at = func.now()
else:
new_record = LibraryCernPublicationRecords(
year=results["year"],
publications_total_count=results["publications_total_count"],
conference_proceedings_count=results["conference_proceedings_count"],
non_journal_proceedings_count=results["non_journal_proceedings_count"],
)
session.add(new_record)

populate_cern_publication_records(unpacked_results)


library_cern_publication_records = library_cern_publication_records_dag()
12 changes: 12 additions & 0 deletions dags/library/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
INSPIRE_ARXIV_RECORDS = (
r"037%3A%27arXiv%27+and+not+980%3Ahidden&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+"
+ r"Server&sf=&so=d&rm=&rg=10&sc=1&of=xm&wl=0"
)

INSPIRE_CURATORS_RECORDS = (
r"035%3A%27oai%3Ainspirehep.net%27+not+"
+ r"037%3A%27arXiv%27&action_search=Search"
+ r"&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&"
+ r"rm=&rg=10&sc=1&of=xm&wl=0"
)
87 changes: 87 additions & 0 deletions dags/library/new_items_in_the_institutional_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from functools import reduce

import library.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from common.models.library.library_new_items_in_the_institutional_repository import (
LibraryNewItemsInTheInstitutionalRepository,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule_interval="@monthly",
params={"year": 2023},
)
def library_new_items_in_the_institutional_repository():
@task(executor_config=kubernetes_executor_config, multiple_outputs=True)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
base_query = f"search?ln=en&p=year%3A{year}+"
type_of_query = [*query][0]
drjova marked this conversation as resolved.
Show resolved Hide resolved
return {
"endpoint": base_query + query[type_of_query],
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
{"inspire_arxiv_records": constants.INSPIRE_ARXIV_RECORDS},
{"inspire_curators_records": constants.INSPIRE_CURATORS_RECORDS},
]

parameters = generate_params.expand(query=query_list)
drjova marked this conversation as resolved.
Show resolved Hide resolved
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = join_and_add_year(counts)

@sqlalchemy_task(conn_id="superset_qa")
def populate_new_items_in_the_institutional_repository(results, session, **kwargs):
record = (
session.query(LibraryNewItemsInTheInstitutionalRepository)
.filter_by(year=results["year"])
.first()
)
if record:
record.inspire_arxiv_records = results["inspire_arxiv_records"]
record.inspire_curators_records = results["inspire_curators_records"]
record.updated_at = func.now()
else:
new_record = LibraryNewItemsInTheInstitutionalRepository(
year=results["year"],
inspire_arxiv_records=results["inspire_arxiv_records"],
inspire_curators_records=results["inspire_curators_records"],
)
session.add(new_record)

populate_new_items_in_the_institutional_repository(results)


Library_new_items_in_the_institutional_repository = (
library_new_items_in_the_institutional_repository()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Library KPIs: New items in the institutional repository

Revision ID: 101f23913167
Revises: 50d9b3ef5a3b
Create Date: 2024-07-05 17:55:10.676310

"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "101f23913167"
down_revision: Union[str, None] = "50d9b3ef5a3b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade():
op.create_table(
"library_items_in_the_institutional_repository",
sa.Column("year", sa.Integer, primary_key=True),
sa.Column("inspire_arxiv_records", sa.Integer, nullable=False),
sa.Column("inspire_curators_records", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
)


def downgrade():
op.drop_table("library_items_in_the_institutional_repository")
49 changes: 26 additions & 23 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -68,29 +70,30 @@ def join_and_add_year(counts, **kwargs):

results = join_and_add_year(counts)

populate_golden_open_access = PostgresOperator(
task_id="populate_golden_open_access",
postgres_conn_id="superset",
sql="""
INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs,
scoap3, other, other_collective_models, created_at, updated_at)
VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s,
%(scoap3)s, %(other)s, %(other_collective_models)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
cern_read_and_publish = EXCLUDED.cern_read_and_publish,
cern_individual_apcs = EXCLUDED.cern_individual_apcs,
scoap3 = EXCLUDED.scoap3,
other = EXCLUDED.other,
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_golden_open_access(results, session, **kwargs):
record = (
session.query(OAGoldenOpenAccess).filter_by(year=results["year"]).first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will fail if there is None, we have to do first_or_none()

)
if record:
record.cern_read_and_publish = results["cern_read_and_publish"]
record.cern_individual_apcs = results["cern_individual_apcs"]
record.scoap3 = results["scoap3"]
record.other = results["other"]
record.other_collective_models = results["other_collective_models"]
record.updated_at = func.now()
else:
new_record = OAGoldenOpenAccess(
year=results["year"],
cern_read_and_publish=results["cern_read_and_publish"],
cern_individual_apcs=results["cern_individual_apcs"],
scoap3=results["scoap3"],
other=results["other"],
other_collective_models=results["other_collective_models"],
)
session.add(new_record)

counts >> results >> populate_golden_open_access
populate_golden_open_access(results)


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
Loading
Loading