From d5d482922bc5b9aa44eb57c5b727d4851bc61e97 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Wed, 31 Jul 2024 11:44:55 +0200 Subject: [PATCH] All DAGs: migration to HttpHook --- dags/common/exceptions.py | 5 -- dags/common/utils.py | 18 +--- dags/library/cern_publication_records.py | 79 ++++++++++-------- dags/library/utils.py | 12 +-- .../gold_open_access_mechanisms.py | 15 ++-- dags/open_access/open_access.py | 83 ++++++++++++------- dags/open_access/utils.py | 19 +++-- tests/open_access/test_data_harvesting.py | 0 8 files changed, 128 insertions(+), 103 deletions(-) create mode 100644 tests/open_access/test_data_harvesting.py diff --git a/dags/common/exceptions.py b/dags/common/exceptions.py index e259094..c22f416 100644 --- a/dags/common/exceptions.py +++ b/dags/common/exceptions.py @@ -5,11 +5,6 @@ def __init__(self, input, current_year): ) -class DataFetchError(Exception): - def __init__(self, status_code, url): - super().__init__(f"Data fetch failure, status_code={status_code}, url={url}") - - class NotFoundTotalCountOfRecords(Exception): def __init__( self, diff --git a/dags/common/utils.py b/dags/common/utils.py index 6054b52..d01af13 100644 --- a/dags/common/utils.py +++ b/dags/common/utils.py @@ -1,23 +1,7 @@ import datetime import re -import requests -from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput - - -def request_again_if_failed(url, cds_token=None): - if cds_token: - header = {"Authorization": f"token {cds_token}"} - response = requests.get(url, header) - response = requests.get(url) - count = 1 - - while response.status_code == 502 and count != 10: - count = count + 1 - response = requests.get(url) - if response.status_code != 200: - raise DataFetchError(url=url, status_code=response.status_code) - return response +from common.exceptions import NotFoundTotalCountOfRecords, WrongInput def get_total_results_count(data): diff --git a/dags/library/cern_publication_records.py b/dags/library/cern_publication_records.py index 9eedeed..60ad4a8 100644 --- a/dags/library/cern_publication_records.py +++ b/dags/library/cern_publication_records.py @@ -1,13 +1,14 @@ -import logging -import os from functools import reduce import pendulum from airflow.decorators import dag, task +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook from airflow.providers.postgres.operators.postgres import PostgresOperator -from common.utils import get_total_results_count, request_again_if_failed +from common.utils import get_total_results_count from executor_config import kubernetes_executor_config -from library.utils import get_url +from library.utils import get_endpoint +from tenacity import retry_if_exception_type, stop_after_attempt @dag( @@ -16,33 +17,45 @@ params={"year": 2023}, ) def library_cern_publication_records_dag(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(key, **kwargs): + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def generate_params(key, **kwargs): year = kwargs["params"].get("year") - cds_token = os.environ.get("CDS_TOKEN") - if not cds_token: - logging.warning("cds token is not set!") - type_of_query = key - url = get_url(type_of_query, year) - data = request_again_if_failed(url=url, cds_token=cds_token) - total = get_total_results_count(data.text) - return {type_of_query: total} + url = get_endpoint(key, year) + return { + "endpoint": url, + "type_of_query": key, + } + + @task(executor_config=kubernetes_executor_config) + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + keys_list = [ + "publications_total_count", + "conference_proceedings_count", + "non_journal_proceedings_count", + ] + + parameters = generate_params.expand(key=keys_list) + counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year return results - results = fetch_data_task.expand( - key=[ - "publications_total_count", - "conference_proceedings_count", - "non_journal_proceedings_count", - ], - ) - unpacked_results = join(results) - + results = join_and_add_year(counts) PostgresOperator( task_id="populate_library_cern_publication_records_table", postgres_conn_id="superset", @@ -50,7 +63,7 @@ def join(values, **kwargs): INSERT INTO library_cern_publication_records (year, publications_total_count, conference_proceedings_count, non_journal_proceedings_count, created_at, updated_at) - VALUES (%(years)s, %(publications_total_count)s, + VALUES (%(year)s, %(publications_total_count)s, %(conference_proceedings_count)s, %(non_journal_proceedings_count)s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (year) @@ -61,14 +74,10 @@ def join(values, **kwargs): updated_at = CURRENT_TIMESTAMP; """, parameters={ - "years": unpacked_results["years"], - "publications_total_count": unpacked_results["publications_total_count"], - "conference_proceedings_count": unpacked_results[ - "conference_proceedings_count" - ], - "non_journal_proceedings_count": unpacked_results[ - "non_journal_proceedings_count" - ], + "year": results["year"], + "publications_total_count": results["publications_total_count"], + "conference_proceedings_count": results["conference_proceedings_count"], + "non_journal_proceedings_count": results["non_journal_proceedings_count"], }, executor_config=kubernetes_executor_config, ) diff --git a/dags/library/utils.py b/dags/library/utils.py index 7aa7a18..e22a0a7 100644 --- a/dags/library/utils.py +++ b/dags/library/utils.py @@ -1,16 +1,16 @@ -def get_url(key, year): - url = { - "publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" +def get_endpoint(key, year): + endpoint = { + "publications_total_count": r"search?wl=0&ln=en&cc=Published+" + r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+" + rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+" + r"for+annual+report%27&f=&action_search=Search&of=xm", - "conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" + "conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+" + r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+" + rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+" + r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm", - "non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+" + "non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+" + rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+" + r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search=" + r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm", } - return url[key] + return endpoint[key] diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index f300b42..7a2b93f 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -3,6 +3,7 @@ import open_access.constants as constants import pendulum from airflow.decorators import dag, task +from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook from airflow.providers.postgres.operators.postgres import PostgresOperator from common.utils import get_total_results_count @@ -17,7 +18,7 @@ ) def oa_gold_open_access_mechanisms(): @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def generate_params(query, **kwargs): + def generate_params(query_object, **kwargs): year = kwargs["params"].get("year") current_collection = "Published+Articles" golden_access_base_query = ( @@ -25,11 +26,11 @@ def generate_params(query, **kwargs): + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter+not+595:'Not+for+annual+report" ) - type_of_query = [*query][0] - query_p = rf"{golden_access_base_query}+{query[type_of_query]}" + type_of_query = [*query_object][0] + query = rf"{golden_access_base_query}+{query_object[type_of_query]}" return { - "endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}" + "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", "type_of_query": type_of_query, @@ -42,13 +43,13 @@ def fetch_count(parameters): endpoint=parameters["endpoint"], _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(AirflowException), }, ) count = get_total_results_count(response.text) return {parameters["type_of_query"]: count} - query_list = [ + queries_objects_list = [ {"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH}, {"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS}, {"scoap3": constants.SCOAP3}, @@ -56,7 +57,7 @@ def fetch_count(parameters): {"other_collective_models": constants.OTHER_COLLECTIVE_MODELS}, ] - parameters = generate_params.expand(query=query_list) + parameters = generate_params.expand(query_object=queries_objects_list) counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index bdf7c1f..5a6ce62 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -4,9 +4,12 @@ import open_access.utils as utils import pendulum from airflow.decorators import dag, task +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook from airflow.providers.postgres.operators.postgres import PostgresOperator -from common.utils import get_total_results_count, request_again_if_failed +from common.utils import get_total_results_count from executor_config import kubernetes_executor_config +from tenacity import retry_if_exception_type, stop_after_attempt @dag( @@ -15,39 +18,63 @@ params={"year": 2023}, ) def oa_dag(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(query, **kwargs): + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def generate_params(query_object, **kwargs): year = kwargs["params"].get("year") + current_collection = "Published+Articles" base_query = ( r"(affiliation:CERN+or+595:'For+annual+report')" + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter" ) - type_of_query = [*query][0] - url = utils.get_url(query=f"{base_query}") - data = request_again_if_failed(url=url) - total = get_total_results_count(data.text) + type_of_query = [*query_object][0] + query = rf"{base_query}+{query_object[type_of_query]}" + + return { + "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", + "type_of_query": type_of_query, + } + + @task(executor_config=kubernetes_executor_config) + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + type_of_query = parameters["type_of_query"] + endpoint = parameters["endpoint"] + total = get_total_results_count(response.text) if type_of_query == "gold": - total = utils.get_golden_access_count(total, url) + total = utils.get_golden_access_count(total, endpoint) if type_of_query == "green": - total = utils.get_green_access_count(total, url) - return {type_of_query: total} + total = utils.get_green_access_count(total, endpoint) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + queries_objects_list = [ + {"closed": constants.CLOSED_ACCESS}, + {"bronze": constants.BRONZE_ACCESS}, + {"green": constants.GREEN_ACCESS}, + {"gold": constants.GOLD_ACCESS}, + ] + + parameters = generate_params.expand(query_object=queries_objects_list) + counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year return results - results = fetch_data_task.expand( - query=[ - {"closed": constants.CLOSED_ACCESS}, - {"bronze": constants.BRONZE_ACCESS}, - {"green": constants.GREEN_ACCESS}, - {"gold": constants.GOLD_ACCESS}, - ], - ) - unpacked_results = join(results) + results = join_and_add_year(counts) PostgresOperator( task_id="populate_open_access_table", @@ -55,7 +82,7 @@ def join(values, **kwargs): sql=""" INSERT INTO oa_open_access (year, closed_access, bronze_open_access, green_open_access, gold_open_access, created_at, updated_at) - VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s, + VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (year) DO UPDATE SET @@ -66,11 +93,11 @@ def join(values, **kwargs): updated_at = CURRENT_TIMESTAMP; """, parameters={ - "years": unpacked_results["years"], - "closed": unpacked_results["closed"], - "bronze": unpacked_results["bronze"], - "green": unpacked_results["green"], - "gold": unpacked_results["gold"], + "year": results["year"], + "closed": results["closed"], + "bronze": results["bronze"], + "green": results["green"], + "gold": results["gold"], }, executor_config=kubernetes_executor_config, ) diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 1f7bc97..5a04274 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -1,31 +1,40 @@ import logging import math -from common.utils import request_again_if_failed +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook from open_access.parsers import ( get_golden_access_records_ids, get_green_access_records_ids, ) +from tenacity import retry_if_exception_type, stop_after_attempt -def get_count(total, url, record_extractor): +def get_count_http_hook(total, url, record_extractor): + http_hook = HttpHook(http_conn_id="cds", method="GET") iterations = math.ceil(total / 100.0) records_ids_count = 0 for i in range(0, iterations): jrec = (i * 100) + 1 full_url = f"{url}&jrec={jrec}" - response = request_again_if_failed(full_url) + response = http_hook.run_with_advanced_retry( + endpoint=full_url, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) records_ids_count = records_ids_count + len(record_extractor(response.text)) logging.info(f"In total was found {records_ids_count} golden access records") return records_ids_count def get_golden_access_count(total, url): - return get_count(total, url, get_golden_access_records_ids) + return get_count_http_hook(total, url, get_golden_access_records_ids) def get_green_access_count(total, url): - return get_count(total, url, get_green_access_records_ids) + return get_count_http_hook(total, url, get_green_access_records_ids) def get_url(query, current_collection="Published+Articles"): diff --git a/tests/open_access/test_data_harvesting.py b/tests/open_access/test_data_harvesting.py new file mode 100644 index 0000000..e69de29