Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAGs: saving json output to s3 #206

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import pendulum
from airflow.decorators import dag, task
from aps.parser import APSParser
from aps.repository import APSRepository
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from structlog import get_logger

Expand All @@ -30,6 +31,8 @@ def enrich_aps(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def aps_process_file():
s3_client = APSRepository()

@task()
def parse(**kwargs):
if "params" in kwargs and "article" in kwargs["params"]:
Expand Down Expand Up @@ -66,6 +69,10 @@ def populate_files(parsed_file):
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -74,6 +81,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
26 changes: 19 additions & 7 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import io
import json
import os
import re
Expand All @@ -8,6 +9,7 @@
from ftplib import error_perm
from io import StringIO
from stat import S_ISDIR, S_ISREG
from inspire_utils.record import get_value

import backoff
import pycountry
Expand All @@ -17,13 +19,11 @@
from common.constants import (
BY_PATTERN,
CDATA_PATTERN,
COUNTRIES_DEFAULT_MAPPING,
COUNTRY_PARSING_PATTERN,
CREATIVE_COMMONS_PATTERN,
LICENSE_PATTERN,
)
from common.constants import (
COUNTRIES_DEFAULT_MAPPING,
INSTITUTIONS_AND_COUNTRIES_MAPPING,
LICENSE_PATTERN,
)
from common.exceptions import (
FoundMoreThanOneMatchOrNone,
Expand Down Expand Up @@ -275,14 +275,13 @@ def create_or_update_article(data):
raise



def parse_country_from_value(affiliation_value):
for key, val in INSTITUTIONS_AND_COUNTRIES_MAPPING.items():
if re.search(r'\b%s\b' % key, affiliation_value, flags=re.IGNORECASE):
if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE):
return val
country = affiliation_value.split(",")[-1].strip()
for key, val in COUNTRIES_DEFAULT_MAPPING.items():
if re.search(r'\b%s\b' % key, country, flags=re.IGNORECASE):
if re.search(r"\b%s\b" % key, country, flags=re.IGNORECASE):
return val

try:
Expand All @@ -299,8 +298,21 @@ def find_country_match_from_mapping(affiliation_value):
if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE):
return COUNTRIES_DEFAULT_MAPPING[key]


def get_country_ISO_name(country):
if COUNTRIES_DEFAULT_MAPPING.get(country):
return COUNTRIES_DEFAULT_MAPPING[country]
else:
return country


def upload_json_to_s3(json_record, repo):
file_in_bytes = io.BytesIO(json.dumps(json_record, indent=2).encode("utf-8"))
current_date = datetime.now().date()
current_date_str = current_date.strftime("%Y-%m-%d")
current_date_and_time_str = current_date.strftime("%Y-%m-%d_%H:%M:%S")
doi = get_value(json_record, "dois.value[0]")
file_key = os.path.join(
"parsed", current_date_str, f"{doi}__{current_date_and_time_str}.json"
)
repo.save(file_key, file_in_bytes)
11 changes: 10 additions & 1 deletion dags/elsevier/elsevier_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, parse_without_names_spaces
from common.utils import (
create_or_update_article,
parse_without_names_spaces,
upload_json_to_s3,
)
from elsevier.parser import ElsevierParser
from elsevier.repository import ElsevierRepository
from inspire_utils.record import get_value
Expand Down Expand Up @@ -76,6 +80,10 @@ def enrich(enhanced_file):
return enrich_elsevier(enhanced_file)
raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata")

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -84,6 +92,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
10 changes: 9 additions & 1 deletion dags/hindawi/hindawi_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from hindawi.parser import HindawiParser
from hindawi.repository import HindawiRepository
from inspire_utils.record import get_value
from structlog import get_logger

Expand All @@ -29,6 +30,8 @@ def enrich_hindawi(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def hindawi_file_processing():
s3_client = HindawiRepository()

@task()
def parse(**kwargs):
record = kwargs.get("params", {}).get("record")
Expand Down Expand Up @@ -68,6 +71,10 @@ def populate_files(parsed_file):
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -76,6 +83,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
9 changes: 8 additions & 1 deletion dags/iop/iop_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from iop.parser import IOPParser
from iop.repository import IOPRepository
Expand Down Expand Up @@ -40,6 +40,8 @@ def iop_enrich_file(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def iop_process_file():
s3_client = IOPRepository()

@task()
def parse_file(**kwargs):
return iop_parse_file(**kwargs)
Expand Down Expand Up @@ -78,10 +80,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
13 changes: 12 additions & 1 deletion dags/oup/oup_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, parse_without_names_spaces
from common.utils import (
create_or_update_article,
parse_without_names_spaces,
upload_json_to_s3,
)
from inspire_utils.record import get_value
from jsonschema import validate
from oup.parser import OUPParser
Expand Down Expand Up @@ -47,6 +51,8 @@ def oup_validate_record(enriched_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def oup_process_file():
s3_client = OUPRepository()

@task()
def parse_file(**kwargs):
return oup_parse_file(**kwargs)
Expand Down Expand Up @@ -85,10 +91,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
9 changes: 8 additions & 1 deletion dags/springer/springer_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from jsonschema import validate
from springer.parser import SpringerParser
Expand Down Expand Up @@ -48,6 +48,8 @@ def springer_validate_record(enriched_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def springer_process_file():
s3_client = SpringerRepository()

@task()
def parse_file(**kwargs):
return springer_parse_file(**kwargs)
Expand Down Expand Up @@ -86,10 +88,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/aps/test_aps_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ def dag():

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6
2 changes: 1 addition & 1 deletion tests/integration/iop/test_iop_dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def extract_zip_to_article(zip_filename):

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


def test_affiliation_countries_in_enriched(article):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/oup/test_oup_dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_affiliation_countries_in_enriched(parser, articles):

def test_dag_loaded(dag):
assert dag
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


publisher = "OUP"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def extract_zip_to_article(zip_filename):

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


@pytest.mark.skip(reason="It does not test anything.")
Expand Down
Loading