diff --git a/dags/aps/aps_process_file.py b/dags/aps/aps_process_file.py index 66e5102d..49ae03c8 100644 --- a/dags/aps/aps_process_file.py +++ b/dags/aps/aps_process_file.py @@ -3,11 +3,12 @@ import pendulum from airflow.decorators import dag, task from aps.parser import APSParser +from aps.repository import APSRepository from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article +from common.utils import create_or_update_article, upload_json_to_s3 from inspire_utils.record import get_value from structlog import get_logger @@ -30,6 +31,8 @@ def enrich_aps(enhanced_file): @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def aps_process_file(): + s3_client = APSRepository() + @task() def parse(**kwargs): if "params" in kwargs and "article" in kwargs["params"]: @@ -66,6 +69,10 @@ def populate_files(parsed_file): logger.info("Files populated", files=parsed_file["files"]) return parsed_file + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + @task() def create_or_update(enriched_file): create_or_update_article(enriched_file) @@ -74,6 +81,7 @@ def create_or_update(enriched_file): enhanced_file = enhance(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file) diff --git a/dags/common/utils.py b/dags/common/utils.py index d2467f86..a6a5bb08 100644 --- a/dags/common/utils.py +++ b/dags/common/utils.py @@ -1,10 +1,12 @@ import datetime +import io import json import os import re import tarfile import xml.etree.ElementTree as ET import zipfile +from datetime import datetime, timezone from ftplib import error_perm from io import StringIO from stat import S_ISDIR, S_ISREG @@ -17,13 +19,11 @@ from common.constants import ( BY_PATTERN, CDATA_PATTERN, + COUNTRIES_DEFAULT_MAPPING, COUNTRY_PARSING_PATTERN, CREATIVE_COMMONS_PATTERN, - LICENSE_PATTERN, -) -from common.constants import ( - COUNTRIES_DEFAULT_MAPPING, INSTITUTIONS_AND_COUNTRIES_MAPPING, + LICENSE_PATTERN, ) from common.exceptions import ( FoundMoreThanOneMatchOrNone, @@ -275,14 +275,13 @@ def create_or_update_article(data): raise - def parse_country_from_value(affiliation_value): for key, val in INSTITUTIONS_AND_COUNTRIES_MAPPING.items(): - if re.search(r'\b%s\b' % key, affiliation_value, flags=re.IGNORECASE): + if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE): return val country = affiliation_value.split(",")[-1].strip() for key, val in COUNTRIES_DEFAULT_MAPPING.items(): - if re.search(r'\b%s\b' % key, country, flags=re.IGNORECASE): + if re.search(r"\b%s\b" % key, country, flags=re.IGNORECASE): return val try: @@ -299,8 +298,21 @@ def find_country_match_from_mapping(affiliation_value): if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE): return COUNTRIES_DEFAULT_MAPPING[key] + def get_country_ISO_name(country): if COUNTRIES_DEFAULT_MAPPING.get(country): return COUNTRIES_DEFAULT_MAPPING[country] else: return country + + +def upload_json_to_s3(repo, json_record): + file_in_bytes = io.BytesIO(json.dumps(json_record, indent=2).encode("utf-8")) + current_date = datetime.now().date() + current_date_str = current_date.strftime("%Y-%m-%d") + current_date_and_time_str = current_date.strftime("%Y-%m-%d_%H:%M:%S") + doi = json_record["dois"][0]["value"] + file_key = os.path.join( + "parsed", current_date_str, f"{doi}__{current_date_and_time_str}.json" + ) + repo.save(file_key, file_in_bytes) diff --git a/dags/elsevier/elsevier_file_processing.py b/dags/elsevier/elsevier_file_processing.py index e64bff8b..258a8f6f 100644 --- a/dags/elsevier/elsevier_file_processing.py +++ b/dags/elsevier/elsevier_file_processing.py @@ -4,7 +4,11 @@ from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article, parse_without_names_spaces +from common.utils import ( + create_or_update_article, + parse_without_names_spaces, + upload_json_to_s3, +) from elsevier.parser import ElsevierParser from elsevier.repository import ElsevierRepository from inspire_utils.record import get_value @@ -76,6 +80,10 @@ def enrich(enhanced_file): return enrich_elsevier(enhanced_file) raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata") + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + @task() def create_or_update(enriched_file): create_or_update_article(enriched_file) @@ -84,6 +92,7 @@ def create_or_update(enriched_file): enhanced_file = enhance(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file) diff --git a/dags/hindawi/hindawi_file_processing.py b/dags/hindawi/hindawi_file_processing.py index b1bfcdb7..103ff671 100644 --- a/dags/hindawi/hindawi_file_processing.py +++ b/dags/hindawi/hindawi_file_processing.py @@ -6,8 +6,9 @@ from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article +from common.utils import create_or_update_article, upload_json_to_s3 from hindawi.parser import HindawiParser +from hindawi.repository import HindawiRepository from inspire_utils.record import get_value from structlog import get_logger @@ -29,6 +30,8 @@ def enrich_hindawi(enhanced_file): @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def hindawi_file_processing(): + s3_client = HindawiRepository() + @task() def parse(**kwargs): record = kwargs.get("params", {}).get("record") @@ -68,6 +71,10 @@ def populate_files(parsed_file): logger.info("Files populated", files=parsed_file["files"]) return parsed_file + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + @task() def create_or_update(enriched_file): create_or_update_article(enriched_file) @@ -76,6 +83,7 @@ def create_or_update(enriched_file): enhanced_file = enhance(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file) diff --git a/dags/iop/iop_process_file.py b/dags/iop/iop_process_file.py index fd8550e3..f280b2ae 100644 --- a/dags/iop/iop_process_file.py +++ b/dags/iop/iop_process_file.py @@ -7,7 +7,7 @@ from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article +from common.utils import create_or_update_article, upload_json_to_s3 from inspire_utils.record import get_value from iop.parser import IOPParser from iop.repository import IOPRepository @@ -40,6 +40,8 @@ def iop_enrich_file(enhanced_file): @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def iop_process_file(): + s3_client = IOPRepository() + @task() def parse_file(**kwargs): return iop_parse_file(**kwargs) @@ -78,10 +80,15 @@ def populate_files(parsed_file): def create_or_update(enriched_file): create_or_update_article(enriched_file) + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich_file(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file) diff --git a/dags/oup/oup_process_file.py b/dags/oup/oup_process_file.py index 94c91dc3..ac8481d5 100644 --- a/dags/oup/oup_process_file.py +++ b/dags/oup/oup_process_file.py @@ -7,7 +7,11 @@ from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article, parse_without_names_spaces +from common.utils import ( + create_or_update_article, + parse_without_names_spaces, + upload_json_to_s3, +) from inspire_utils.record import get_value from jsonschema import validate from oup.parser import OUPParser @@ -47,6 +51,8 @@ def oup_validate_record(enriched_file): @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def oup_process_file(): + s3_client = OUPRepository() + @task() def parse_file(**kwargs): return oup_parse_file(**kwargs) @@ -85,10 +91,15 @@ def populate_files(parsed_file): def create_or_update(enriched_file): create_or_update_article(enriched_file) + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich_file(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file) diff --git a/dags/springer/springer_process_file.py b/dags/springer/springer_process_file.py index 0b557527..83d82565 100644 --- a/dags/springer/springer_process_file.py +++ b/dags/springer/springer_process_file.py @@ -8,7 +8,7 @@ from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask from common.scoap3_s3 import Scoap3Repository -from common.utils import create_or_update_article +from common.utils import create_or_update_article, upload_json_to_s3 from inspire_utils.record import get_value from jsonschema import validate from springer.parser import SpringerParser @@ -48,6 +48,8 @@ def springer_validate_record(enriched_file): @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def springer_process_file(): + s3_client = SpringerRepository() + @task() def parse_file(**kwargs): return springer_parse_file(**kwargs) @@ -86,10 +88,15 @@ def populate_files(parsed_file): def create_or_update(enriched_file): create_or_update_article(enriched_file) + @task() + def save_to_s3(enriched_file): + upload_json_to_s3(json_record=enriched_file, repo=s3_client) + parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) enhanced_file_with_files = populate_files(enhanced_file) enriched_file = enrich_file(enhanced_file_with_files) + save_to_s3(enriched_file=enriched_file) create_or_update(enriched_file)