From eec1a1fdc24e2fc024cace0458cc53ed929a7a63 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Wed, 24 Apr 2024 18:14:36 +0200 Subject: [PATCH] OA: revert --- dags/open_access/constants.py | 12 ++++++ dags/open_access/open_access.py | 67 +++++++++++++++-------------- dags/open_access/open_access_api.py | 63 --------------------------- dags/open_access/utils.py | 14 +++--- 4 files changed, 51 insertions(+), 105 deletions(-) create mode 100644 dags/open_access/constants.py delete mode 100644 dags/open_access/open_access_api.py diff --git a/dags/open_access/constants.py b/dags/open_access/constants.py new file mode 100644 index 0000000..11f7c75 --- /dev/null +++ b/dags/open_access/constants.py @@ -0,0 +1,12 @@ +CLOSED_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" ++r"not+540__f:Bronze+not+540__3:preprint" +BRONZE_ACCESS = r"540__f:'Bronze'" +GREEN_ACCESS = r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:" ++r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'" +GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')" + +CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP" +CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'" +SCOAP3 = r"540__f:'SCOAP3'" +OTHER = r"540__f:'Other'" +OTHER_COLLECTIVE_MODELS = r"540__f:'Collective'" diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index e8ca968..79e4d61 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -1,10 +1,13 @@ +import logging +import os from functools import reduce +import open_access.constants as constants +import open_access.utils as utils import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator from executor_config import kubernetes_executor_config -from open_access.open_access_api import OpenAccessApi @dag( @@ -14,46 +17,44 @@ ) def oa_dag(): @task(executor_config=kubernetes_executor_config) - def set_a_year(**kwargs): + def fetch_data_task(query, **kwargs): year = kwargs["params"].get("year") - return OpenAccessApi(year=year) + cds_token = os.environ.get("CDS_TOKEN") + if not cds_token: + logging.warning("cds token is not set!") + base_query = ( + r"(affiliation:CERN+or+595:'For+annual+report')" + + rf"and+year:{year}+not+980:ConferencePaper+" + + r"not+980:BookChapter" + + rf"&apikey={cds_token}" + if cds_token + else "" + ) + type_of_query = [*query][0] + url = utils.get_url(f"{base_query}+{query[type_of_query]}") + data = utils.request_again_if_failed(url) + total = utils.get_total_results_count(data.text) + if type_of_query == "gold": + total = utils.get_gold_access_count(total, url) + if type_of_query == "green": + total = total - utils.get_gold_access_count(total, url) + return {type_of_query: total} - @task(executor_config=kubernetes_executor_config) - def fetch_closed_access(api, **kwargs): - return api.get_closed_access_total_count() - - @task(executor_config=kubernetes_executor_config) - def fetch_bronze_access(api, **kwargs): - return api.get_bronze_access_total_count() - - @task(executor_config=kubernetes_executor_config) - def fetch_green_access(api, **kwargs): - return api.get_green_access_total_count() - - @task(executor_config=kubernetes_executor_config) - def fetch_gold_access(api, **kwargs): - return api.get_gold_access_total_count() - - @task(executor_config=kubernetes_executor_config) + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) def join(values, **kwargs): results = reduce(lambda a, b: {**a, **b}, values) results["years"] = kwargs["params"].get("year") return results - api = set_a_year() - closed_access_count = fetch_closed_access(api) - closed_bronze_count = fetch_bronze_access(api) - closed_green__count = fetch_green_access(api) - closed_gold_count = fetch_gold_access(api) - - unpacked_results = join( - [ - closed_access_count, - closed_bronze_count, - closed_green__count, - closed_gold_count, - ] + results = fetch_data_task.expand( + query=[ + {"closed_access": constants.CLOSED_ACCESS}, + {"bronze_open_access": constants.BRONZE_ACCESS}, + {"green_open_access": constants.GREEN_ACCESS}, + {"gold_open_access": constants.GOLD_ACCESS}, + ], ) + unpacked_results = join(results) PostgresOperator( task_id="populate_open_access_table", diff --git a/dags/open_access/open_access_api.py b/dags/open_access/open_access_api.py deleted file mode 100644 index d679200..0000000 --- a/dags/open_access/open_access_api.py +++ /dev/null @@ -1,63 +0,0 @@ -import os - -import open_access.utils as utils -from common.exceptions import TypeDoesNotExist -from open_access.parsers import get_golden_access_records_ids - - -class OpenAccessApi: - query_types = { - "closed": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" - + r"not+540__f:Bronze+not+540__3:preprint", - "bronze": r"540__f:'Bronze'", - "green": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:" - + r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'", - "gold": r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')", - } - - def __init__(self, year, cds_token=None): - self.base_query = ( - r"(affiliation:CERN+or+595:'For+annual+report')" - + rf"and+year:{year}+not+980:ConferencePaper+" - + r"not+980:BookChapter" - ) - self.cds_token = cds_token or os.environ.get("CDS_TOKEN") - - def _get_url(self, query, current_collection="Published+Articles"): - url = ( - rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}" - + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" - + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm" - + rf"&apikey={self.cds_token}" - if self.cds_token - else "" - ) - return url - - def _get_records_count(self, query_type): - if query_type not in self.query_types: - raise TypeDoesNotExist - self.url = self._get_url(f"{self.base_query}+{self.query_types[query_type]}") - response = utils.request_again_if_failed(self.url) - total = utils.get_total_results_count(response.text) - return int(total) - - def get_closed_access_total_count(self): - return self._get_records_count("closed") - - def get_bronze_access_total_count(self): - return self._get_records_count("bronze") - - def get_green_access_total_count(self): - total = self._get_records_count("green") - total_gold_access_records_inside_of_green = utils.filter_records( - total=total, url=self.url, filter_func=get_golden_access_records_ids - ) - return total - total_gold_access_records_inside_of_green - - def get_gold_access_total_count(self): - total = self._get_records_count("gold") - total_only_gold_access_records = utils.filter_records( - total=total, url=self.url, filter_func=get_golden_access_records_ids - ) - return total_only_gold_access_records diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 243ab62..dd6ecdb 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -4,6 +4,7 @@ import requests from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput +from open_access.parsers import get_golden_access_records_ids def request_again_if_failed(url): @@ -31,13 +32,6 @@ def get_total_results_count(data): raise NotFoundTotalCountOfRecords -cern_read_and_publish = r"540__f:'CERN-RP" -cern_individual_apcs = r"540__f:'CERN-APC'" -scoap3 = r"540__f:'SCOAP3'" -other = r"540__f:'Other'" -other_collective_models = r"540__f:'Collective'" - - def check_year(year): current_year = datetime.date.today().year if type(year) == int: @@ -46,12 +40,14 @@ def check_year(year): raise WrongInput(year, current_year) -def filter_records(total, url, filter_func): +def get_gold_access_count(total, url): iterations = math.ceil(total / 100.0) records_ids_count = 0 for i in range(0, iterations): jrec = (i * 100) + 1 full_url = f"{url}&jrec={jrec}" response = request_again_if_failed(full_url) - records_ids_count = records_ids_count + len(filter_func(response.text)) + records_ids_count = records_ids_count + len( + get_golden_access_records_ids(response.text) + ) return records_ids_count