Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OA: revert without API #19

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dags/open_access/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CLOSED_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+"
+r"not+540__f:Bronze+not+540__3:preprint"
BRONZE_ACCESS = r"540__f:'Bronze'"
GREEN_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"

CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP"
CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'"
SCOAP3 = r"540__f:'SCOAP3'"
OTHER = r"540__f:'Other'"
OTHER_COLLECTIVE_MODELS = r"540__f:'Collective'"
67 changes: 34 additions & 33 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
import os
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config
from open_access.open_access_api import OpenAccessApi


@dag(
Expand All @@ -14,46 +17,44 @@
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def set_a_year(**kwargs):
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
return OpenAccessApi(year=year)
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
+ rf"&apikey={cds_token}"
if cds_token
else ""
)
type_of_query = [*query][0]
url = utils.get_url(f"{base_query}+{query[type_of_query]}")
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
return {type_of_query: total}

@task(executor_config=kubernetes_executor_config)
def fetch_closed_access(api, **kwargs):
return api.get_closed_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_bronze_access(api, **kwargs):
return api.get_bronze_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_green_access(api, **kwargs):
return api.get_green_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_gold_access(api, **kwargs):
return api.get_gold_access_total_count()

@task(executor_config=kubernetes_executor_config)
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

api = set_a_year()
closed_access_count = fetch_closed_access(api)
closed_bronze_count = fetch_bronze_access(api)
closed_green__count = fetch_green_access(api)
closed_gold_count = fetch_gold_access(api)

unpacked_results = join(
[
closed_access_count,
closed_bronze_count,
closed_green__count,
closed_gold_count,
]
results = fetch_data_task.expand(
query=[
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_open_access_table",
Expand Down
63 changes: 0 additions & 63 deletions dags/open_access/open_access_api.py

This file was deleted.

14 changes: 5 additions & 9 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput
from open_access.parsers import get_golden_access_records_ids


def request_again_if_failed(url):
Expand Down Expand Up @@ -31,13 +32,6 @@ def get_total_results_count(data):
raise NotFoundTotalCountOfRecords


cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
scoap3 = r"540__f:'SCOAP3'"
other = r"540__f:'Other'"
other_collective_models = r"540__f:'Collective'"


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
Expand All @@ -46,12 +40,14 @@ def check_year(year):
raise WrongInput(year, current_year)


def filter_records(total, url, filter_func):
def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
records_ids_count = records_ids_count + len(filter_func(response.text))
records_ids_count = records_ids_count + len(
get_golden_access_records_ids(response.text)
)
return records_ids_count
Loading