diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2fc4a58..5c88959 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,7 @@ repos: rev: "v2.7.1" hooks: - id: prettier + exclude: '^tests/integration/cassettes/.*\.yaml$' - repo: https://github.com/pycqa/isort rev: "5.12.0" hooks: diff --git a/dags/common/exceptions.py b/dags/common/exceptions.py index 199a119..e259094 100644 --- a/dags/common/exceptions.py +++ b/dags/common/exceptions.py @@ -8,3 +8,17 @@ def __init__(self, input, current_year): class DataFetchError(Exception): def __init__(self, status_code, url): super().__init__(f"Data fetch failure, status_code={status_code}, url={url}") + + +class NotFoundTotalCountOfRecords(Exception): + def __init__( + self, + ): + super().__init__("Total count of records is not found!") + + +class TypeDoesNotExist(Exception): + def __init__(self, type_string, all_types): + super().__init__( + f"{type_string} this type does not exist, Available types: {all_types}" + ) diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index c008b01..b290051 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -4,7 +4,6 @@ import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator -from common.exceptions import DataFetchError from executor_config import kubernetes_executor_config @@ -24,14 +23,8 @@ def fetch_data_task(query, **kwargs): ) type_of_query = [*query][0] url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}") - response = utils.get_data(url) - count = 1 - while response.status_code == 502 and count != 10: - count = count + 1 - response = utils.get_data(url) - if response.status_code != 200: - raise DataFetchError(url, response.status_code) - total = utils.get_total_results_count(response.text) + data = utils.request_again_if_failed(url) + total = utils.get_total_results_count(data.text) return {type_of_query: total} @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index 8873070..e8ca968 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -1,10 +1,10 @@ from functools import reduce -import open_access.utils as utils import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator from executor_config import kubernetes_executor_config +from open_access.open_access_api import OpenAccessApi @dag( @@ -14,39 +14,46 @@ ) def oa_dag(): @task(executor_config=kubernetes_executor_config) - def fetch_data_task(query, **kwargs): + def set_a_year(**kwargs): year = kwargs["params"].get("year") - base_query = ( - r"(affiliation:CERN+or+595:'For+annual+report')" - + rf"and+year:{year}+not+980:ConferencePaper+" - + r"not+980:BookChapter" - ) - type_of_query = [*query][0] - url = utils.get_url(f"{base_query}+{query[type_of_query]}") - data = utils.get_data(url) - total = utils.get_total_results_count(data.text) - if type_of_query == "gold": - total = utils.get_gold_access_count(total, url) - if type_of_query == "green": - total = total - utils.get_gold_access_count(total, url) - return {type_of_query: total} + return OpenAccessApi(year=year) - @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + @task(executor_config=kubernetes_executor_config) + def fetch_closed_access(api, **kwargs): + return api.get_closed_access_total_count() + + @task(executor_config=kubernetes_executor_config) + def fetch_bronze_access(api, **kwargs): + return api.get_bronze_access_total_count() + + @task(executor_config=kubernetes_executor_config) + def fetch_green_access(api, **kwargs): + return api.get_green_access_total_count() + + @task(executor_config=kubernetes_executor_config) + def fetch_gold_access(api, **kwargs): + return api.get_gold_access_total_count() + + @task(executor_config=kubernetes_executor_config) def join(values, **kwargs): results = reduce(lambda a, b: {**a, **b}, values) results["years"] = kwargs["params"].get("year") return results - results = fetch_data_task.expand( - query=[ - {"closed": utils.closed_access_query}, - {"bronze": utils.bronze_access_query}, - {"green": utils.green_access_query}, - {"gold": utils.gold_access_query}, - ], - ) + api = set_a_year() + closed_access_count = fetch_closed_access(api) + closed_bronze_count = fetch_bronze_access(api) + closed_green__count = fetch_green_access(api) + closed_gold_count = fetch_gold_access(api) - unpacked_results = join(results) + unpacked_results = join( + [ + closed_access_count, + closed_bronze_count, + closed_green__count, + closed_gold_count, + ] + ) PostgresOperator( task_id="populate_open_access_table", diff --git a/dags/open_access/open_access_api.py b/dags/open_access/open_access_api.py new file mode 100644 index 0000000..d679200 --- /dev/null +++ b/dags/open_access/open_access_api.py @@ -0,0 +1,63 @@ +import os + +import open_access.utils as utils +from common.exceptions import TypeDoesNotExist +from open_access.parsers import get_golden_access_records_ids + + +class OpenAccessApi: + query_types = { + "closed": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + + r"not+540__f:Bronze+not+540__3:preprint", + "bronze": r"540__f:'Bronze'", + "green": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:" + + r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'", + "gold": r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')", + } + + def __init__(self, year, cds_token=None): + self.base_query = ( + r"(affiliation:CERN+or+595:'For+annual+report')" + + rf"and+year:{year}+not+980:ConferencePaper+" + + r"not+980:BookChapter" + ) + self.cds_token = cds_token or os.environ.get("CDS_TOKEN") + + def _get_url(self, query, current_collection="Published+Articles"): + url = ( + rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm" + + rf"&apikey={self.cds_token}" + if self.cds_token + else "" + ) + return url + + def _get_records_count(self, query_type): + if query_type not in self.query_types: + raise TypeDoesNotExist + self.url = self._get_url(f"{self.base_query}+{self.query_types[query_type]}") + response = utils.request_again_if_failed(self.url) + total = utils.get_total_results_count(response.text) + return int(total) + + def get_closed_access_total_count(self): + return self._get_records_count("closed") + + def get_bronze_access_total_count(self): + return self._get_records_count("bronze") + + def get_green_access_total_count(self): + total = self._get_records_count("green") + total_gold_access_records_inside_of_green = utils.filter_records( + total=total, url=self.url, filter_func=get_golden_access_records_ids + ) + return total - total_gold_access_records_inside_of_green + + def get_gold_access_total_count(self): + total = self._get_records_count("gold") + total_only_gold_access_records = utils.filter_records( + total=total, url=self.url, filter_func=get_golden_access_records_ids + ) + return total_only_gold_access_records diff --git a/dags/open_access/parsers.py b/dags/open_access/parsers.py index 3fd6da6..55da0ca 100644 --- a/dags/open_access/parsers.py +++ b/dags/open_access/parsers.py @@ -18,19 +18,20 @@ def get_golden_access_records_ids(data): records = xml.findall(".record") golden_access = [] for record in records: - datafields = record.find("datafield/[@tag='540']") + datafields = record.findall("datafield/[@tag='540']") if datafields is None: continue - record_type = datafields.find("subfield/[@code='3']") - license = datafields.find("subfield/[@code='a']") - if record_type is not None and license is not None: - if ( - "CC" in license.text - and "BY" in license.text - and record_type.text == "publication" - ): - record_id = record.find("controlfield/[@tag='001']") - if record_id is not None: - doi = record_id.text - golden_access.append(doi) + for datafield in datafields: + record_type = datafield.find("subfield/[@code='3']") + license = datafield.find("subfield/[@code='a']") + if record_type is not None and license is not None: + if ( + "CC" in license.text + and "BY" in license.text + and record_type.text == "publication" + ): + record_id = record.find("controlfield/[@tag='001']") + if record_id is not None: + doi = record_id.text + golden_access.append(doi) return golden_access diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 0bf334e..243ab62 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -2,33 +2,20 @@ import math import re -import backoff import requests -from common.exceptions import WrongInput -from open_access.parsers import get_golden_access_records_ids +from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput -def get_url(query, current_collection="Published+Articles"): - url = ( - rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}" - + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" - + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm" - ) - return url - - -def get_gold_access_count(total, url): - iterations = math.ceil(total / 100.0) - golden_access_records_ids_count = 0 +def request_again_if_failed(url): + response = requests.get(url) + count = 1 - for i in range(0, iterations): - jrec = (i * 100) + 1 - full_url = f"{url}&jrec={jrec}" - data = get_data(full_url) - golden_access_records_ids_count = golden_access_records_ids_count + len( - get_golden_access_records_ids(data.text) - ) - return golden_access_records_ids_count + while response.status_code == 502 and count != 10: + count = count + 1 + response = requests.get(url) + if response.status_code != 200: + raise DataFetchError(url=url, status_code=response.status_code) + return response def get_total_results_count(data): @@ -41,20 +28,8 @@ def get_total_results_count(data): total_records_count = match.group(1) return int(total_records_count) except AttributeError: - return 0 - + raise NotFoundTotalCountOfRecords -closed_access_query = ( - r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint" -) -bronze_access_query = r"540__f:'Bronze'" -green_access_query = ( - r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:" - + r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'" -) -gold_access_query = ( - r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')" -) cern_read_and_publish = r"540__f:'CERN-RP" cern_individual_apcs = r"540__f:'CERN-APC'" @@ -63,16 +38,20 @@ def get_total_results_count(data): other_collective_models = r"540__f:'Collective'" -@backoff.on_exception( - backoff.expo, requests.exceptions.ProxyError, max_time=1000, max_tries=10 -) -def get_data(url): - return requests.get(url) - - def check_year(year): current_year = datetime.date.today().year if type(year) == int: if int(year) >= 2004 and int(year) <= current_year: return year raise WrongInput(year, current_year) + + +def filter_records(total, url, filter_func): + iterations = math.ceil(total / 100.0) + records_ids_count = 0 + for i in range(0, iterations): + jrec = (i * 100) + 1 + full_url = f"{url}&jrec={jrec}" + response = request_again_if_failed(full_url) + records_ids_count = records_ids_count + len(filter_func(response.text)) + return records_ids_count diff --git a/tests/open_access/test_parser.py b/tests/open_access/test_parser.py index 1987617..599a76d 100644 --- a/tests/open_access/test_parser.py +++ b/tests/open_access/test_parser.py @@ -1,15 +1,21 @@ from open_access.parsers import get_golden_access_records_ids expected = [ + "2894668", "2891488", "2888511", + "2888151", "2884471", "2884470", "2883672", "2882429", "2882335", + "2882328", + "2882327", "2882324", + "2882322", "2882311", + "2882298", ]