Skip to content

Commit

Permalink
All DAGs: migration to HttpHook
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 31, 2024
1 parent 8aded34 commit d5d4829
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 103 deletions.
5 changes: 0 additions & 5 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ def __init__(self, input, current_year):
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
Expand Down
18 changes: 1 addition & 17 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
import datetime
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response
from common.exceptions import NotFoundTotalCountOfRecords, WrongInput


def get_total_results_count(data):
Expand Down
79 changes: 44 additions & 35 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
import os
from functools import reduce

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from library.utils import get_url
from library.utils import get_endpoint
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -16,41 +17,53 @@
params={"year": 2023},
)
def library_cern_publication_records_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(key, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
type_of_query = key
url = get_url(type_of_query, year)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
return {type_of_query: total}
url = get_endpoint(key, year)
return {
"endpoint": url,
"type_of_query": key,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

keys_list = [
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
]

parameters = generate_params.expand(key=keys_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
key=[
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
],
)
unpacked_results = join(results)

results = join_and_add_year(counts)
PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
VALUES (%(year)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
Expand All @@ -61,14 +74,10 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
"non_journal_proceedings_count"
],
"year": results["year"],
"publications_total_count": results["publications_total_count"],
"conference_proceedings_count": results["conference_proceedings_count"],
"non_journal_proceedings_count": results["non_journal_proceedings_count"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
12 changes: 6 additions & 6 deletions dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
def get_url(key, year):
url = {
"publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
def get_endpoint(key, year):
endpoint = {
"publications_total_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
+ rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+"
+ r"for+annual+report%27&f=&action_search=Search&of=xm",
"conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
"conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+"
+ rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+"
+ r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm",
"non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+"
"non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+"
+ rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+"
+ r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm",
}
return url[key]
return endpoint[key]
15 changes: 8 additions & 7 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import open_access.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
Expand All @@ -17,19 +18,19 @@
)
def oa_gold_open_access_mechanisms():
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query, **kwargs):
def generate_params(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
golden_access_base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
query_p = rf"{golden_access_base_query}+{query[type_of_query]}"
type_of_query = [*query_object][0]
query = rf"{golden_access_base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
Expand All @@ -42,21 +43,21 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
queries_objects_list = [
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
{"scoap3": constants.SCOAP3},
{"other": constants.OTHER},
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
]

parameters = generate_params.expand(query=query_list)
parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
83 changes: 55 additions & 28 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -15,47 +18,71 @@
params={"year": 2023},
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}")
data = request_again_if_failed(url=url)
total = get_total_results_count(data.text)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
if type_of_query == "gold":
total = utils.get_golden_access_count(total, url)
total = utils.get_golden_access_count(total, endpoint)
if type_of_query == "green":
total = utils.get_green_access_count(total, url)
return {type_of_query: total}
total = utils.get_green_access_count(total, endpoint)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

queries_objects_list = [
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
]

parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
query=[
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset",
sql="""
INSERT INTO oa_open_access (year, closed_access, bronze_open_access,
green_open_access, gold_open_access, created_at, updated_at)
VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
Expand All @@ -66,11 +93,11 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"closed": unpacked_results["closed"],
"bronze": unpacked_results["bronze"],
"green": unpacked_results["green"],
"gold": unpacked_results["gold"],
"year": results["year"],
"closed": results["closed"],
"bronze": results["bronze"],
"green": results["green"],
"gold": results["gold"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
Loading

0 comments on commit d5d4829

Please sign in to comment.