Skip to content

Commit

Permalink
OA: revert without API
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 24, 2024
1 parent 4bfa99f commit 2f3dc20
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 105 deletions.
12 changes: 12 additions & 0 deletions dags/open_access/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CLOSED_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+"
+r"not+540__f:Bronze+not+540__3:preprint"
BRONZE_ACCESS = r"540__f:'Bronze'"
GREEN_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"

CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP"
CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'"
SCOAP3 = r"540__f:'SCOAP3'"
OTHER = r"540__f:'Other'"
OTHER_COLLECTIVE_MODELS = r"540__f:'Collective'"
59 changes: 26 additions & 33 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config
from open_access.open_access_api import OpenAccessApi


@dag(
Expand All @@ -14,46 +15,38 @@
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def set_a_year(**kwargs):
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
return OpenAccessApi(year=year)
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(f"{base_query}+{query[type_of_query]}")
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
return {type_of_query: total}

@task(executor_config=kubernetes_executor_config)
def fetch_closed_access(api, **kwargs):
return api.get_closed_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_bronze_access(api, **kwargs):
return api.get_bronze_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_green_access(api, **kwargs):
return api.get_green_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_gold_access(api, **kwargs):
return api.get_gold_access_total_count()

@task(executor_config=kubernetes_executor_config)
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

api = set_a_year()
closed_access_count = fetch_closed_access(api)
closed_bronze_count = fetch_bronze_access(api)
closed_green__count = fetch_green_access(api)
closed_gold_count = fetch_gold_access(api)

unpacked_results = join(
[
closed_access_count,
closed_bronze_count,
closed_green__count,
closed_gold_count,
]
results = fetch_data_task.expand(
query=[
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_open_access_table",
Expand Down
63 changes: 0 additions & 63 deletions dags/open_access/open_access_api.py

This file was deleted.

14 changes: 5 additions & 9 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput
from open_access.parsers import get_golden_access_records_ids


def request_again_if_failed(url):
Expand Down Expand Up @@ -31,13 +32,6 @@ def get_total_results_count(data):
raise NotFoundTotalCountOfRecords


cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
scoap3 = r"540__f:'SCOAP3'"
other = r"540__f:'Other'"
other_collective_models = r"540__f:'Collective'"


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
Expand All @@ -46,12 +40,14 @@ def check_year(year):
raise WrongInput(year, current_year)


def filter_records(total, url, filter_func):
def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
records_ids_count = records_ids_count + len(filter_func(response.text))
records_ids_count = records_ids_count + len(
get_golden_access_records_ids(response.text)
)
return records_ids_count

0 comments on commit 2f3dc20

Please sign in to comment.