Skip to content

Commit

Permalink
OA: fixes
Browse files Browse the repository at this point in the history
* Parser bug fix.
* Added golden access test.
* Raise an error when no final records count is not found.
* Raise an error when the data fetch failed.
* Created OpenAccess Api.
  • Loading branch information
ErnestaP committed Apr 24, 2024
1 parent 06f5946 commit 9a74ad0
Show file tree
Hide file tree
Showing 10 changed files with 530,948 additions and 91 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repos:
rev: "v2.7.1"
hooks:
- id: prettier
exclude: '^tests/integration/cassettes/.*\.yaml$'
- repo: https://github.com/pycqa/isort
rev: "5.12.0"
hooks:
Expand Down
14 changes: 14 additions & 0 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,17 @@ def __init__(self, input, current_year):
class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
):
super().__init__("Total count of records is not found!")


class TypeDoesNotExist(Exception):
def __init__(self, type_string, all_types):
super().__init__(
f"{type_string} this type does not exist, Available types: {all_types}"
)
11 changes: 2 additions & 9 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.exceptions import DataFetchError
from executor_config import kubernetes_executor_config


Expand All @@ -24,14 +23,8 @@ def fetch_data_task(query, **kwargs):
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
response = utils.get_data(url)
count = 1
while response.status_code == 502 and count != 10:
count = count + 1
response = utils.get_data(url)
if response.status_code != 200:
raise DataFetchError(url, response.status_code)
total = utils.get_total_results_count(response.text)
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
59 changes: 33 additions & 26 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from functools import reduce

import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config
from open_access.open_access_api import OpenAccessApi


@dag(
Expand All @@ -14,39 +14,46 @@
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
def set_a_year(**kwargs):
year = kwargs["params"].get("year")
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(f"{base_query}+{query[type_of_query]}")
data = utils.get_data(url)
total = utils.get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
return {type_of_query: total}
return OpenAccessApi(year=year)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
@task(executor_config=kubernetes_executor_config)
def fetch_closed_access(api, **kwargs):
return api.get_closed_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_bronze_access(api, **kwargs):
return api.get_bronze_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_green_access(api, **kwargs):
return api.get_green_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_gold_access(api, **kwargs):
return api.get_gold_access_total_count()

@task(executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
{"closed": utils.closed_access_query},
{"bronze": utils.bronze_access_query},
{"green": utils.green_access_query},
{"gold": utils.gold_access_query},
],
)
api = set_a_year()
closed_access_count = fetch_closed_access(api)
closed_bronze_count = fetch_bronze_access(api)
closed_green__count = fetch_green_access(api)
closed_gold_count = fetch_gold_access(api)

unpacked_results = join(results)
unpacked_results = join(
[
closed_access_count,
closed_bronze_count,
closed_green__count,
closed_gold_count,
]
)

PostgresOperator(
task_id="populate_open_access_table",
Expand Down
63 changes: 63 additions & 0 deletions dags/open_access/open_access_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

import open_access.utils as utils
from common.exceptions import TypeDoesNotExist
from open_access.parsers import get_golden_access_records_ids


class OpenAccessApi:
query_types = {
"closed": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+"
+ r"not+540__f:Bronze+not+540__3:preprint",
"bronze": r"540__f:'Bronze'",
"green": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'",
"gold": r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')",
}

def __init__(self, year, cds_token=None):
self.base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
self.cds_token = cds_token or os.environ.get("CDS_TOKEN")

def _get_url(self, query, current_collection="Published+Articles"):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
+ rf"&apikey={self.cds_token}"
if self.cds_token
else ""
)
return url

def _get_records_count(self, query_type):
if query_type not in self.query_types:
raise TypeDoesNotExist
self.url = self._get_url(f"{self.base_query}+{self.query_types[query_type]}")
response = utils.request_again_if_failed(self.url)
total = utils.get_total_results_count(response.text)
return int(total)

def get_closed_access_total_count(self):
return self._get_records_count("closed")

def get_bronze_access_total_count(self):
return self._get_records_count("bronze")

def get_green_access_total_count(self):
total = self._get_records_count("green")
total_gold_access_records_inside_of_green = utils.filter_records(
total=total, url=self.url, filter_func=get_golden_access_records_ids
)
return total_gold_access_records_inside_of_green

def get_gold_access_total_count(self):
total = self._get_records_count("gold")
total_only_gold_access_records = utils.filter_records(
total=total, url=self.url, filter_func=get_golden_access_records_ids
)
return total - total_only_gold_access_records
27 changes: 14 additions & 13 deletions dags/open_access/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ def get_golden_access_records_ids(data):
records = xml.findall(".record")
golden_access = []
for record in records:
datafields = record.find("datafield/[@tag='540']")
datafields = record.findall("datafield/[@tag='540']")
if datafields is None:
continue
record_type = datafields.find("subfield/[@code='3']")
license = datafields.find("subfield/[@code='a']")
if record_type is not None and license is not None:
if (
"CC" in license.text
and "BY" in license.text
and record_type.text == "publication"
):
record_id = record.find("controlfield/[@tag='001']")
if record_id is not None:
doi = record_id.text
golden_access.append(doi)
for datafield in datafields:
record_type = datafield.find("subfield/[@code='3']")
license = datafield.find("subfield/[@code='a']")
if record_type is not None and license is not None:
if (
"CC" in license.text
and "BY" in license.text
and record_type.text == "publication"
):
record_id = record.find("controlfield/[@tag='001']")
if record_id is not None:
doi = record_id.text
golden_access.append(doi)
return golden_access
65 changes: 22 additions & 43 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,20 @@
import math
import re

import backoff
import requests
from common.exceptions import WrongInput
from open_access.parsers import get_golden_access_records_ids
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def get_url(query, current_collection="Published+Articles"):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
return url


def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
golden_access_records_ids_count = 0
def request_again_if_failed(url):
response = requests.get(url)
count = 1

for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
data = get_data(full_url)
golden_access_records_ids_count = golden_access_records_ids_count + len(
get_golden_access_records_ids(data.text)
)
return golden_access_records_ids_count
while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response


def get_total_results_count(data):
Expand All @@ -41,20 +28,8 @@ def get_total_results_count(data):
total_records_count = match.group(1)
return int(total_records_count)
except AttributeError:
return 0

raise NotFoundTotalCountOfRecords

closed_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint"
)
bronze_access_query = r"540__f:'Bronze'"
green_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
)
gold_access_query = (
r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"
)

cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
Expand All @@ -63,16 +38,20 @@ def get_total_results_count(data):
other_collective_models = r"540__f:'Collective'"


@backoff.on_exception(
backoff.expo, requests.exceptions.ProxyError, max_time=1000, max_tries=10
)
def get_data(url):
return requests.get(url)


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)


def filter_records(total, url, filter_func):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
records_ids_count = records_ids_count + len(filter_func(response.text))
return records_ids_count
Loading

0 comments on commit 9a74ad0

Please sign in to comment.