Skip to content

Commit

Permalink
global: run create_or_update after record validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP authored Dec 1, 2023
1 parent d2f9e84 commit fefa1b5
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 23 deletions.
22 changes: 15 additions & 7 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aps.parser import APSParser
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article
from jsonschema import validate

Expand All @@ -27,6 +28,7 @@ def enrich_aps(enhanced_file):
def aps_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
Expand All @@ -38,26 +40,32 @@ def parse(**kwargs):
return parse_aps(article)

@task()
def enchance(parsed_file):
return parsed_file and enhance_aps(parsed_file)
def enhance(parsed_file):
if not parsed_file:
raise EmptyOutputFromPreviousTask("parse")
return enhance_aps(parsed_file)

@task()
def enrich(enhanced_file):
return enhanced_file and enrich_aps(enhanced_file)
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance")
return enrich_aps(enhanced_file)

@task()
def validate_record(enriched_file):
return enriched_file and aps_validate_record(enriched_file)
if enriched_file:
raise EmptyOutputFromPreviousTask("enrich")
return aps_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enchance(parsed_file)
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validate_record(enriched_file)
create_or_update(enriched_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)


dag_for_aps_files_processing = aps_process_file()
22 changes: 15 additions & 7 deletions dags/hindawi/hindawi_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.decorators import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article
from hindawi.parser import HindawiParser
from jsonschema import validate
Expand All @@ -26,6 +27,7 @@ def enrich_hindawi(enhanced_file):
def hindawi_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
Expand All @@ -39,26 +41,32 @@ def parse(**kwargs):
return parse_hindawi(xml)

@task()
def enchance(parsed_file):
return parsed_file and enhance_hindawi(parsed_file)
def enhance(parsed_file):
if not parsed_file:
raise EmptyOutputFromPreviousTask("parse")
return enhance_hindawi(parsed_file)

@task()
def enrich(enhanced_file):
return enhanced_file and enrich_hindawi(enhanced_file)
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance")
return enrich_hindawi(enhanced_file)

@task()
def validate_record(enriched_file):
return enriched_file and hindawi_validate_record(enriched_file)
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich")
return hindawi_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enchance(parsed_file)
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validate_record(enriched_file)
create_or_update(enriched_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)


Hindawi_file_processing = hindawi_file_processing()
14 changes: 11 additions & 3 deletions dags/iop/iop_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.decorators import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article
from iop.parser import IOPParser
from jsonschema import validate
Expand Down Expand Up @@ -35,6 +36,7 @@ def iop_enrich_file(enhanced_file):
def iop_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
Expand All @@ -45,15 +47,21 @@ def parse_file(**kwargs):

@task()
def enhance_file(parsed_file):
if not parsed_file:
raise EmptyOutputFromPreviousTask("parse_file")
return iop_enhance_file(parsed_file)

@task()
def enrich_file(enhanced_file):
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance_file")
return iop_enrich_file(enhanced_file)

@task()
def validate_record(enriched_file):
iop_validate_record(enriched_file)
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich_file")
return iop_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
Expand All @@ -62,8 +70,8 @@ def create_or_update(enriched_file):
parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enriched_file = enrich_file(enhanced_file)
validate_record(enriched_file)
create_or_update(enriched_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)


dag_taskflow = iop_process_file()
14 changes: 11 additions & 3 deletions dags/oup/oup_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.decorators import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article, parse_without_names_spaces
from jsonschema import validate
from oup.parser import OUPParser
Expand Down Expand Up @@ -34,6 +35,7 @@ def oup_enrich_file(enhanced_file):
def oup_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
Expand All @@ -44,15 +46,21 @@ def parse_file(**kwargs):

@task()
def enhance_file(parsed_file):
if not parsed_file:
raise EmptyOutputFromPreviousTask("parse_file")
return oup_enhance_file(parsed_file)

@task()
def enrich_file(enhanced_file):
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance_file")
return oup_enrich_file(enhanced_file)

@task()
def validate_record(enriched_file):
oup_validate_record(enriched_file)
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich_file")
return oup_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
Expand All @@ -61,8 +69,8 @@ def create_or_update(enriched_file):
parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enriched_file = enrich_file(enhanced_file)
validate_record(enriched_file)
create_or_update(enriched_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)


dag_taskflow = oup_process_file()
14 changes: 11 additions & 3 deletions dags/springer/dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.decorators import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article
from jsonschema import validate
from springer.parser import SpringerParser
Expand Down Expand Up @@ -35,6 +36,7 @@ def springer_enrich_file(enhanced_file):
def springer_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
Expand All @@ -45,15 +47,21 @@ def parse_file(**kwargs):

@task()
def enhance_file(parsed_file):
if not parsed_file:
raise EmptyOutputFromPreviousTask("parse_file")
return springer_enhance_file(parsed_file)

@task()
def enrich_file(enhanced_file):
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance_file")
return springer_enrich_file(enhanced_file)

@task()
def validate_record(enriched_file):
springer_validate_record(enriched_file)
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich_file")
return springer_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
Expand All @@ -62,8 +70,8 @@ def create_or_update(enriched_file):
parsed_file = parse_file()
enhanced_file = enhance_file(parsed_file)
enriched_file = enrich_file(enhanced_file)
validate_record(enriched_file)
create_or_update(enriched_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)


dag_taskflow = springer_process_file()
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_case_instance():
def assertListEqual(test_case_instance):
return lambda first, second: test_case_instance.assertCountEqual(first, second)


@pytest.fixture(scope="session")
def vcr_config():
return {
Expand Down

0 comments on commit fefa1b5

Please sign in to comment.