Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OA: xml parser bug fix #18

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repos:
rev: "v2.7.1"
hooks:
- id: prettier
exclude: '^tests/integration/cassettes/.*\.yaml$'
- repo: https://github.com/pycqa/isort
rev: "5.12.0"
hooks:
Expand Down
14 changes: 14 additions & 0 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,17 @@ def __init__(self, input, current_year):
class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
):
super().__init__("Total count of records is not found!")


class TypeDoesNotExist(Exception):
def __init__(self, type_string, all_types):
super().__init__(
f"{type_string} this type does not exist, Available types: {all_types}"
)
11 changes: 2 additions & 9 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.exceptions import DataFetchError
from executor_config import kubernetes_executor_config


Expand All @@ -24,14 +23,8 @@ def fetch_data_task(query, **kwargs):
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
response = utils.get_data(url)
count = 1
while response.status_code == 502 and count != 10:
count = count + 1
response = utils.get_data(url)
if response.status_code != 200:
raise DataFetchError(url, response.status_code)
total = utils.get_total_results_count(response.text)
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
59 changes: 33 additions & 26 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from functools import reduce

import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config
from open_access.open_access_api import OpenAccessApi


@dag(
Expand All @@ -14,39 +14,46 @@
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
def set_a_year(**kwargs):
year = kwargs["params"].get("year")
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(f"{base_query}+{query[type_of_query]}")
data = utils.get_data(url)
total = utils.get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
return {type_of_query: total}
return OpenAccessApi(year=year)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
@task(executor_config=kubernetes_executor_config)
def fetch_closed_access(api, **kwargs):
return api.get_closed_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_bronze_access(api, **kwargs):
return api.get_bronze_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_green_access(api, **kwargs):
return api.get_green_access_total_count()

@task(executor_config=kubernetes_executor_config)
def fetch_gold_access(api, **kwargs):
return api.get_gold_access_total_count()

@task(executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
{"closed": utils.closed_access_query},
{"bronze": utils.bronze_access_query},
{"green": utils.green_access_query},
{"gold": utils.gold_access_query},
],
)
api = set_a_year()
closed_access_count = fetch_closed_access(api)
closed_bronze_count = fetch_bronze_access(api)
closed_green__count = fetch_green_access(api)
closed_gold_count = fetch_gold_access(api)

unpacked_results = join(results)
unpacked_results = join(
[
closed_access_count,
closed_bronze_count,
closed_green__count,
closed_gold_count,
]
)

PostgresOperator(
task_id="populate_open_access_table",
Expand Down
63 changes: 63 additions & 0 deletions dags/open_access/open_access_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

import open_access.utils as utils
from common.exceptions import TypeDoesNotExist
from open_access.parsers import get_golden_access_records_ids


class OpenAccessApi:
query_types = {
"closed": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+"
+ r"not+540__f:Bronze+not+540__3:preprint",
"bronze": r"540__f:'Bronze'",
"green": r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'",
"gold": r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')",
}

def __init__(self, year, cds_token=None):
self.base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
self.cds_token = cds_token or os.environ.get("CDS_TOKEN")

def _get_url(self, query, current_collection="Published+Articles"):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
+ rf"&apikey={self.cds_token}"
if self.cds_token
else ""
)
return url

def _get_records_count(self, query_type):
if query_type not in self.query_types:
raise TypeDoesNotExist
self.url = self._get_url(f"{self.base_query}+{self.query_types[query_type]}")
response = utils.request_again_if_failed(self.url)
total = utils.get_total_results_count(response.text)
return int(total)

def get_closed_access_total_count(self):
return self._get_records_count("closed")

def get_bronze_access_total_count(self):
return self._get_records_count("bronze")

def get_green_access_total_count(self):
total = self._get_records_count("green")
total_gold_access_records_inside_of_green = utils.filter_records(
total=total, url=self.url, filter_func=get_golden_access_records_ids
)
return total - total_gold_access_records_inside_of_green

def get_gold_access_total_count(self):
total = self._get_records_count("gold")
total_only_gold_access_records = utils.filter_records(
total=total, url=self.url, filter_func=get_golden_access_records_ids
)
return total_only_gold_access_records
27 changes: 14 additions & 13 deletions dags/open_access/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ def get_golden_access_records_ids(data):
records = xml.findall(".record")
golden_access = []
for record in records:
datafields = record.find("datafield/[@tag='540']")
datafields = record.findall("datafield/[@tag='540']")
if datafields is None:
continue
record_type = datafields.find("subfield/[@code='3']")
license = datafields.find("subfield/[@code='a']")
if record_type is not None and license is not None:
if (
"CC" in license.text
and "BY" in license.text
and record_type.text == "publication"
):
record_id = record.find("controlfield/[@tag='001']")
if record_id is not None:
doi = record_id.text
golden_access.append(doi)
for datafield in datafields:
record_type = datafield.find("subfield/[@code='3']")
license = datafield.find("subfield/[@code='a']")
if record_type is not None and license is not None:
if (
"CC" in license.text
and "BY" in license.text
and record_type.text == "publication"
):
record_id = record.find("controlfield/[@tag='001']")
if record_id is not None:
doi = record_id.text
golden_access.append(doi)
return golden_access
65 changes: 22 additions & 43 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,20 @@
import math
import re

import backoff
import requests
from common.exceptions import WrongInput
from open_access.parsers import get_golden_access_records_ids
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def get_url(query, current_collection="Published+Articles"):
url = (
rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm"
)
return url


def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
golden_access_records_ids_count = 0
def request_again_if_failed(url):
response = requests.get(url)
count = 1

for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
data = get_data(full_url)
golden_access_records_ids_count = golden_access_records_ids_count + len(
get_golden_access_records_ids(data.text)
)
return golden_access_records_ids_count
while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response


def get_total_results_count(data):
Expand All @@ -41,20 +28,8 @@ def get_total_results_count(data):
total_records_count = match.group(1)
return int(total_records_count)
except AttributeError:
return 0

raise NotFoundTotalCountOfRecords

closed_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint"
)
bronze_access_query = r"540__f:'Bronze'"
green_access_query = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
)
gold_access_query = (
r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"
)

cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
Expand All @@ -63,16 +38,20 @@ def get_total_results_count(data):
other_collective_models = r"540__f:'Collective'"


@backoff.on_exception(
backoff.expo, requests.exceptions.ProxyError, max_time=1000, max_tries=10
)
def get_data(url):
return requests.get(url)


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)


def filter_records(total, url, filter_func):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
records_ids_count = records_ids_count + len(filter_func(response.text))
return records_ids_count
6 changes: 6 additions & 0 deletions tests/open_access/test_parser.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from open_access.parsers import get_golden_access_records_ids

expected = [
"2894668",
"2891488",
"2888511",
"2888151",
"2884471",
"2884470",
"2883672",
"2882429",
"2882335",
"2882328",
"2882327",
"2882324",
"2882322",
"2882311",
"2882298",
]


Expand Down
Loading