Skip to content

Commit

Permalink
DAGs: saving json output to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed May 14, 2024
1 parent 50ba8fb commit 02d1322
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 17 deletions.
10 changes: 9 additions & 1 deletion dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import pendulum
from airflow.decorators import dag, task
from aps.parser import APSParser
from aps.repository import APSRepository
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from structlog import get_logger

Expand All @@ -30,6 +31,8 @@ def enrich_aps(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def aps_process_file():
s3_client = APSRepository()

@task()
def parse(**kwargs):
if "params" in kwargs and "article" in kwargs["params"]:
Expand Down Expand Up @@ -66,6 +69,10 @@ def populate_files(parsed_file):
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -74,6 +81,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
25 changes: 18 additions & 7 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import io
import json
import os
import re
Expand All @@ -17,13 +18,11 @@
from common.constants import (
BY_PATTERN,
CDATA_PATTERN,
COUNTRIES_DEFAULT_MAPPING,
COUNTRY_PARSING_PATTERN,
CREATIVE_COMMONS_PATTERN,
LICENSE_PATTERN,
)
from common.constants import (
COUNTRIES_DEFAULT_MAPPING,
INSTITUTIONS_AND_COUNTRIES_MAPPING,
LICENSE_PATTERN,
)
from common.exceptions import (
FoundMoreThanOneMatchOrNone,
Expand Down Expand Up @@ -275,14 +274,13 @@ def create_or_update_article(data):
raise



def parse_country_from_value(affiliation_value):
for key, val in INSTITUTIONS_AND_COUNTRIES_MAPPING.items():
if re.search(r'\b%s\b' % key, affiliation_value, flags=re.IGNORECASE):
if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE):
return val
country = affiliation_value.split(",")[-1].strip()
for key, val in COUNTRIES_DEFAULT_MAPPING.items():
if re.search(r'\b%s\b' % key, country, flags=re.IGNORECASE):
if re.search(r"\b%s\b" % key, country, flags=re.IGNORECASE):
return val

try:
Expand All @@ -299,8 +297,21 @@ def find_country_match_from_mapping(affiliation_value):
if re.search(r"\b%s\b" % key, affiliation_value, flags=re.IGNORECASE):
return COUNTRIES_DEFAULT_MAPPING[key]


def get_country_ISO_name(country):
if COUNTRIES_DEFAULT_MAPPING.get(country):
return COUNTRIES_DEFAULT_MAPPING[country]
else:
return country


def upload_json_to_s3(repo, json_record):
file_in_bytes = io.BytesIO(json.dumps(json_record, indent=2).encode("utf-8"))
current_date = datetime.datetime.now().date()
current_date_str = current_date.strftime("%Y-%m-%d")
current_date_and_time_str = current_date.strftime("%Y-%m-%d_%H:%M:%S")
doi = json_record["dois"][0]["value"]
file_key = os.path.join(
"parsed", current_date_str, f"{doi}__{current_date_and_time_str}.json"
)
repo.save(file_key, file_in_bytes)
11 changes: 10 additions & 1 deletion dags/elsevier/elsevier_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, parse_without_names_spaces
from common.utils import (
create_or_update_article,
parse_without_names_spaces,
upload_json_to_s3,
)
from elsevier.parser import ElsevierParser
from elsevier.repository import ElsevierRepository
from inspire_utils.record import get_value
Expand Down Expand Up @@ -76,6 +80,10 @@ def enrich(enhanced_file):
return enrich_elsevier(enhanced_file)
raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata")

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -84,6 +92,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
10 changes: 9 additions & 1 deletion dags/hindawi/hindawi_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from hindawi.parser import HindawiParser
from hindawi.repository import HindawiRepository
from inspire_utils.record import get_value
from structlog import get_logger

Expand All @@ -29,6 +30,8 @@ def enrich_hindawi(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def hindawi_file_processing():
s3_client = HindawiRepository()

@task()
def parse(**kwargs):
record = kwargs.get("params", {}).get("record")
Expand Down Expand Up @@ -68,6 +71,10 @@ def populate_files(parsed_file):
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
Expand All @@ -76,6 +83,7 @@ def create_or_update(enriched_file):
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
9 changes: 8 additions & 1 deletion dags/iop/iop_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from iop.parser import IOPParser
from iop.repository import IOPRepository
Expand Down Expand Up @@ -40,6 +40,8 @@ def iop_enrich_file(enhanced_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def iop_process_file():
s3_client = IOPRepository()

@task()
def parse_file(**kwargs):
return iop_parse_file(**kwargs)
Expand Down Expand Up @@ -78,10 +80,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
13 changes: 12 additions & 1 deletion dags/oup/oup_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, parse_without_names_spaces
from common.utils import (
create_or_update_article,
parse_without_names_spaces,
upload_json_to_s3,
)
from inspire_utils.record import get_value
from jsonschema import validate
from oup.parser import OUPParser
Expand Down Expand Up @@ -47,6 +51,8 @@ def oup_validate_record(enriched_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def oup_process_file():
s3_client = OUPRepository()

@task()
def parse_file(**kwargs):
return oup_parse_file(**kwargs)
Expand Down Expand Up @@ -85,10 +91,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
9 changes: 8 additions & 1 deletion dags/springer/springer_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
from jsonschema import validate
from springer.parser import SpringerParser
Expand Down Expand Up @@ -48,6 +48,8 @@ def springer_validate_record(enriched_file):

@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def springer_process_file():
s3_client = SpringerRepository()

@task()
def parse_file(**kwargs):
return springer_parse_file(**kwargs)
Expand Down Expand Up @@ -86,10 +88,15 @@ def populate_files(parsed_file):
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)

parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich_file(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/aps/test_aps_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ def dag():

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6
2 changes: 1 addition & 1 deletion tests/integration/iop/test_iop_dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def extract_zip_to_article(zip_filename):

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


def test_affiliation_countries_in_enriched(article):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/oup/test_oup_dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_affiliation_countries_in_enriched(parser, articles):

def test_dag_loaded(dag):
assert dag
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


publisher = "OUP"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def extract_zip_to_article(zip_filename):

def test_dag_loaded(dag):
assert dag is not None
assert len(dag.tasks) == 5
assert len(dag.tasks) == 6


@pytest.mark.skip(reason="It does not test anything.")
Expand Down

0 comments on commit 02d1322

Please sign in to comment.